package component

import (
	"fmt"
	"time"

	"gitee.com/chenhonghua/ginorigin/config/http/restful"
	"gitee.com/chenhonghua/ginorigin/config/http/router"
	"gitee.com/chenhonghua/ginorigin/config/storage/kafka"
	"gitee.com/chenhonghua/ginorigin/config/system/zap"

	"github.com/Shopify/sarama"
	"github.com/gin-gonic/gin"
)

var topics []string = []string{"tp001", "tp002", "tp003"}

type componentKafkaDemo struct{}

func (d componentKafkaDemo) load() {
	iroutes := router.GetIRoutes("examples/component")
	iroutes.GET("kafka/test1", d.componentKafkaDemoApiHandler1)
}

// 组件测试：kafka
// @Tags component
// @Summary 组件测试：kafka
// @Produce  application/json
// @Success 200 {object} restful.Response{data=map[string]interface{},msg=string} "组件测试：kafka"
// @Router /examples/component/kafka/test1 [get]
func (ckd componentKafkaDemo) componentKafkaDemoApiHandler1(c *gin.Context) {
	ckd.consume(topics[0])
	ckd.consume(topics[1])
	ckd.consume(topics[2])
	ckd.product()
	restful.Success.WithMessage(c, "样例接口1")
}

func (componentKafkaDemo) product() {
	i := 0
	var msg kafka.Msg
	for i < 10 {
		i++
		time.Sleep(time.Duration(1) * time.Second)
		msg = kafka.Msg{Topic: topics[0], Message: "消息：" + time.Now().String() + "\n"}
		msg.SendMessage()
		msg = kafka.Msg{Topic: topics[1], Message: "消息：" + time.Now().String() + "\n"}
		msg.SendMessage()
		msg = kafka.Msg{Topic: topics[2], Message: "消息：" + time.Now().String() + "\n"}
		msg.SendMessage()
		// fmt.Printf("发送kafka消息\n")
	}
	time.Sleep(time.Duration(1) * time.Second)
}

func (componentKafkaDemo) consume(topic string) {
	s := kafka.Sub{
		Topic: topic,
		Event: func(msg *sarama.ConsumerMessage) {
			fmt.Printf("topic[%s]收到消息：%s", topic, string(msg.Value))
		},
	}
	zap.PanicIfErr(s.Subcribe())
}
